Skip to main content
Version: 1.0.7

Contextual-Bandits using Vowpal Wabbit

In the contextual bandit problem, a learner repeatedly observes a context, chooses an action, and observes a loss/cost/reward for the chosen action only. Contextual bandit algorithms use additional side information (or context) to aid real world decision-making. They work well for choosing actions in dynamic environments where options change rapidly, and the set of available actions is limited.

An in-depth tutorial can be found here

Azure Personalizer emits logs in DSJSON-format. This example demonstrates how to perform off-policy evaluation.

Step1: Read the dataset

import pyspark.sql.types as T
from pyspark.sql import functions as F

schema = T.StructType(
[
T.StructField("input", T.StringType(), False),
]
)

df = (
spark.read.format("text")
.schema(schema)
.load("wasbs://publicwasb@mmlspark.blob.core.windows.net/decisionservice.json")
)
# print dataset basic info
print("records read: " + str(df.count()))
print("Schema: ")
df.printSchema()
display(df)

Step 2: Use VowpalWabbitFeaturizer to convert data features into vector

from synapse.ml.vw import VowpalWabbitDSJsonTransformer

df_train = (
VowpalWabbitDSJsonTransformer()
.setDsJsonColumn("input")
.transform(df)
.withColumn("splitId", F.lit(0))
.repartition(2)
)

# Show structured nature of rewards
df_train.printSchema()

# exclude JSON to avoid overflow
display(df_train.drop("input"))

Step 3: Train model

VowpalWabbitGeneric performs these steps:

  • trains a model for each split (=group)
  • synchronizes accross partitions after every split
  • store the 1-step ahead predictions in the model
from synapse.ml.vw import VowpalWabbitGeneric

model = (
VowpalWabbitGeneric()
.setPassThroughArgs(
"--cb_adf --cb_type mtr --clip_p 0.1 -q GT -q MS -q GR -q OT -q MT -q OS --dsjson --preserve_performance_counters"
)
.setInputCol("input")
.setSplitCol("splitId")
.setPredictionIdCol("EventId")
.fit(df_train)
)

Step 4: Predict and evaluate

df_predictions = model.getOneStepAheadPredictions()  # .show(5, False)
df_headers = df_train.drop("input")

df_headers_predictions = df_headers.join(df_predictions, "EventId")
display(df_headers_predictions)
from synapse.ml.vw import VowpalWabbitCSETransformer

metrics = VowpalWabbitCSETransformer().transform(df_headers_predictions)

display(metrics)

For each field of the reward column the metrics are calculated

per_reward_metrics = metrics.select("reward.*")

display(per_reward_metrics)